]> git.saurik.com Git - apple/libdispatch.git/blob - examples/Dispatch Samples/netcat.c
libdispatch-84.5.tar.gz
[apple/libdispatch.git] / examples / Dispatch Samples / netcat.c
1 /*
2 * Copyright (c) 2008 Apple Inc. All rights reserved.
3 *
4 * @APPLE_DTS_LICENSE_HEADER_START@
5 *
6 * IMPORTANT: This Apple software is supplied to you by Apple Computer, Inc.
7 * ("Apple") in consideration of your agreement to the following terms, and your
8 * use, installation, modification or redistribution of this Apple software
9 * constitutes acceptance of these terms. If you do not agree with these terms,
10 * please do not use, install, modify or redistribute this Apple software.
11 *
12 * In consideration of your agreement to abide by the following terms, and
13 * subject to these terms, Apple grants you a personal, non-exclusive license,
14 * under Apple's copyrights in this original Apple software (the "Apple Software"),
15 * to use, reproduce, modify and redistribute the Apple Software, with or without
16 * modifications, in source and/or binary forms; provided that if you redistribute
17 * the Apple Software in its entirety and without modifications, you must retain
18 * this notice and the following text and disclaimers in all such redistributions
19 * of the Apple Software. Neither the name, trademarks, service marks or logos of
20 * Apple Computer, Inc. may be used to endorse or promote products derived from
21 * the Apple Software without specific prior written permission from Apple. Except
22 * as expressly stated in this notice, no other rights or licenses, express or
23 * implied, are granted by Apple herein, including but not limited to any patent
24 * rights that may be infringed by your derivative works or by other works in
25 * which the Apple Software may be incorporated.
26 *
27 * The Apple Software is provided by Apple on an "AS IS" basis. APPLE MAKES NO
28 * WARRANTIES, EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE IMPLIED
29 * WARRANTIES OF NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR
30 * PURPOSE, REGARDING THE APPLE SOFTWARE OR ITS USE AND OPERATION ALONE OR IN
31 * COMBINATION WITH YOUR PRODUCTS.
32 *
33 * IN NO EVENT SHALL APPLE BE LIABLE FOR ANY SPECIAL, INDIRECT, INCIDENTAL OR
34 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
35 * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * ARISING IN ANY WAY OUT OF THE USE, REPRODUCTION, MODIFICATION AND/OR
37 * DISTRIBUTION OF THE APPLE SOFTWARE, HOWEVER CAUSED AND WHETHER UNDER THEORY OF
38 * CONTRACT, TORT (INCLUDING NEGLIGENCE), STRICT LIABILITY OR OTHERWISE, EVEN IF
39 * APPLE HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * @APPLE_DTS_LICENSE_HEADER_END@
42 */
43
44 #include <dispatch/dispatch.h>
45 #include <Block.h>
46
47 #include <stdlib.h>
48 #include <unistd.h>
49 #include <err.h>
50 #include <syslog.h>
51 #include <sys/types.h>
52 #include <sys/socket.h>
53 #include <sys/fcntl.h>
54 #include <errno.h>
55 #include <netdb.h>
56 #include <stdbool.h>
57 #include <sysexits.h>
58 #include <stdio.h>
59 #include <string.h>
60 #include <sys/param.h>
61 #include <sys/ioctl.h>
62 #include <mach/mach.h>
63 #include <pthread.h>
64
65 // #define DEBUG 1
66
67 #if DEBUG
68 #define dlog(a) dispatch_debug(a, #a)
69 #else
70 #define dlog(a) do { } while(0)
71 #endif
72
73 void usage(void);
74 void *run_block(void *);
75 void setup_fd_relay(int netfd /* bidirectional */,
76 int infd /* local input */,
77 int outfd /* local output */,
78 void (^finalizer_block)(void));
79 void doreadwrite(int fd1, int fd2, char *buffer, size_t len);
80
81 #define BUFFER_SIZE 1099
82
83 int main(int argc, char *argv[]) {
84
85 int ch;
86 bool use_v4_only = false, use_v6_only = false;
87 bool debug = false, no_stdin = false;
88 bool keep_listening = false, do_listen = false;
89 bool do_loookups = true, verbose = false;
90 bool do_udp = false, do_bind_ip = false, do_bind_port = false;
91 const char *hostname, *servname;
92 int ret;
93 struct addrinfo hints, *aires, *aires0;
94 const char *bind_hostname, *bind_servname;
95
96 dispatch_queue_t dq;
97 dispatch_group_t listen_group = NULL;
98
99 while ((ch = getopt(argc, argv, "46Ddhklnvup:s:")) != -1) {
100 switch (ch) {
101 case '4':
102 use_v4_only = true;
103 break;
104 case '6':
105 use_v6_only = true;
106 break;
107 case 'D':
108 debug = true;
109 break;
110 case 'd':
111 no_stdin = true;
112 break;
113 case 'h':
114 usage();
115 break;
116 case 'k':
117 keep_listening = true;
118 break;
119 case 'l':
120 do_listen = true;
121 break;
122 case 'n':
123 do_loookups = false;
124 break;
125 case 'v':
126 verbose = true;
127 break;
128 case 'u':
129 do_udp = true;
130 break;
131 case 'p':
132 do_bind_port = true;
133 bind_servname = optarg;
134 break;
135 case 's':
136 do_bind_ip = true;
137 bind_hostname = optarg;
138 break;
139 case '?':
140 default:
141 usage();
142 break;
143 }
144 }
145
146 argc -= optind;
147 argv += optind;
148
149 if (use_v4_only && use_v6_only) {
150 errx(EX_USAGE, "-4 and -6 specified");
151 }
152
153 if (keep_listening && !do_listen) {
154 errx(EX_USAGE, "-k specified but no -l");
155 }
156
157 if (do_listen && (do_bind_ip || do_bind_port)) {
158 errx(EX_USAGE, "-p or -s option with -l");
159 }
160
161 if (do_listen) {
162 if (argc >= 2) {
163 hostname = argv[0];
164 servname = argv[1];
165 } else if (argc >= 1) {
166 hostname = NULL;
167 servname = argv[0];
168 } else {
169 errx(EX_USAGE, "No service name provided");
170 }
171 } else {
172 if (argc >= 2) {
173 hostname = argv[0];
174 servname = argv[1];
175 } else {
176 errx(EX_USAGE, "No hostname and service name provided");
177 }
178 }
179
180 if (do_bind_ip || do_bind_port) {
181 if (!do_bind_ip) {
182 bind_hostname = NULL;
183 }
184 if (!do_bind_port) {
185 bind_servname = NULL;
186 }
187 }
188
189 openlog(getprogname(), LOG_PERROR|LOG_CONS, LOG_DAEMON);
190 setlogmask(debug ? LOG_UPTO(LOG_DEBUG) : verbose ? LOG_UPTO(LOG_INFO) : LOG_UPTO(LOG_ERR));
191
192 dq = dispatch_queue_create("netcat", NULL);
193 listen_group = dispatch_group_create();
194
195 bzero(&hints, sizeof(hints));
196 hints.ai_family = use_v4_only ? PF_INET : (use_v6_only ? PF_INET6 : PF_UNSPEC);
197 hints.ai_socktype = do_udp ? SOCK_DGRAM : SOCK_STREAM;
198 hints.ai_protocol = do_udp ? IPPROTO_UDP : IPPROTO_TCP;
199 hints.ai_flags = (!do_loookups ? AI_NUMERICHOST | AI_NUMERICSERV : 0) | (do_listen ? AI_PASSIVE : 0);
200
201 ret = getaddrinfo(hostname, servname, &hints, &aires0);
202 if (ret) {
203 errx(1, "getaddrinfo(%s, %s): %s", hostname, servname, gai_strerror(ret));
204 }
205
206 for (aires = aires0; aires; aires = aires->ai_next) {
207 if (do_listen) {
208 // asynchronously set up the socket
209 dispatch_retain(dq);
210 dispatch_group_async(listen_group,
211 dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0),
212 ^{
213 int s, val = 1;
214 dispatch_source_t ds;
215
216 s = socket(aires->ai_family, aires->ai_socktype, aires->ai_protocol);
217 if (s < 0) {
218 warn("socket");
219 return;
220 }
221
222 if(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char *)&val, sizeof(val)) < 0) {
223 warn("Could not set SO_REUSEADDR");
224 }
225
226 if(setsockopt(s, SOL_SOCKET, SO_REUSEPORT, (const char *)&val, sizeof(val)) < 0) {
227 warn("Could not set SO_REUSEPORT");
228 }
229
230 if(setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val)) < 0) {
231 warn("Could not set SO_NOSIGPIPE");
232 }
233
234 if (bind(s, aires->ai_addr, aires->ai_addrlen) < 0) {
235 warn("bind");
236 close(s);
237 return;
238 }
239
240 listen(s, 2);
241 syslog(LOG_DEBUG, "listening on socket %d", s);
242 ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, s, 0, dq);
243 dispatch_source_set_event_handler(ds, ^{
244 // got an incoming connection
245 int s2, lfd = dispatch_source_get_handle(ds);
246 dispatch_queue_t listen_queue = dispatch_get_current_queue();
247
248 // prevent further accept(2)s across multiple sources
249 dispatch_retain(listen_queue);
250 dispatch_suspend(listen_queue);
251
252 if (do_udp) {
253 // lfd is our socket, but let's connect in the reverse
254 // direction to set up the connection fully
255 char udpbuf[4];
256 struct sockaddr_storage sockin;
257 socklen_t socklen;
258 ssize_t peeklen;
259 int cret;
260
261 socklen = sizeof(sockin);
262 peeklen = recvfrom(lfd, udpbuf, sizeof(udpbuf),
263 MSG_PEEK, (struct sockaddr *)&sockin, &socklen);
264 if (peeklen < 0) {
265 warn("recvfrom");
266 dispatch_resume(listen_queue);
267 dispatch_release(listen_queue);
268 return;
269 }
270
271 cret = connect(lfd, (struct sockaddr *)&sockin, socklen);
272 if (cret < 0) {
273 warn("connect");
274 dispatch_resume(listen_queue);
275 dispatch_release(listen_queue);
276 return;
277 }
278
279 s2 = lfd;
280 syslog(LOG_DEBUG, "accepted socket %d", s2);
281 } else {
282 s2 = accept(lfd, NULL, NULL);
283 if (s2 < 0) {
284 warn("accept");
285 dispatch_resume(listen_queue);
286 dispatch_release(listen_queue);
287 return;
288 }
289 syslog(LOG_DEBUG, "accepted socket %d -> %d", lfd, s2);
290 }
291
292
293 setup_fd_relay(s2, no_stdin ? -1 : STDIN_FILENO, STDOUT_FILENO, ^{
294 if (!do_udp) {
295 close(s2);
296 }
297 dispatch_resume(listen_queue);
298 dispatch_release(listen_queue);
299 if (!keep_listening) {
300 exit(0);
301 }
302 });
303 });
304 dispatch_resume(ds);
305 dispatch_release(dq);
306 });
307 } else {
308 // synchronously try each address to try to connect
309 __block bool did_connect = false;
310
311 dispatch_sync(dq, ^{
312 int s, val = 1;
313
314 s = socket(aires->ai_family, aires->ai_socktype, aires->ai_protocol);
315 if (s < 0) {
316 warn("socket");
317 return;
318 }
319
320 if(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char *)&val, sizeof(val)) < 0) {
321 warn("Could not set SO_REUSEADDR");
322 }
323
324 if(setsockopt(s, SOL_SOCKET, SO_REUSEPORT, (const char *)&val, sizeof(val)) < 0) {
325 warn("Could not set SO_REUSEPORT");
326 }
327
328 if(setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val)) < 0) {
329 warn("Could not set SO_NOSIGPIPE");
330 }
331
332 if (do_bind_port || do_bind_ip) {
333 struct addrinfo bhints, *bind_aires;
334 int bret;
335 in_port_t bport;
336
337 bzero(&bhints, sizeof(bhints));
338 bhints.ai_family = aires->ai_family;
339 bhints.ai_socktype = aires->ai_socktype;
340 bhints.ai_protocol = aires->ai_protocol;
341 bhints.ai_flags = (do_bind_ip ? AI_NUMERICHOST : 0) | (do_bind_port ? AI_NUMERICSERV : 0) | AI_PASSIVE;
342
343 bret = getaddrinfo(bind_hostname, bind_servname, &bhints, &bind_aires);
344 if (bret) {
345 warnx("getaddrinfo(%s, %s): %s", bind_hostname, bind_servname, gai_strerror(bret));
346 close(s);
347 freeaddrinfo(bind_aires);
348 return;
349 }
350
351 switch(bind_aires->ai_family) {
352 case PF_INET:
353 bport = ((struct sockaddr_in *)bind_aires->ai_addr)->sin_port;
354 break;
355 case PF_INET6:
356 bport = ((struct sockaddr_in6 *)bind_aires->ai_addr)->sin6_port;
357 break;
358 default:
359 bport = htons(0);
360 break;
361 }
362
363 if (ntohs(bport) > 0 && ntohs(bport) < IPPORT_RESERVED) {
364 bret = bindresvport_sa(s, (struct sockaddr *)bind_aires->ai_addr);
365 } else {
366 bret = bind(s, bind_aires->ai_addr, bind_aires->ai_addrlen);
367 }
368
369 if (bret < 0) {
370 warn("bind");
371 close(s);
372 freeaddrinfo(bind_aires);
373 return;
374 }
375
376 freeaddrinfo(bind_aires);
377 }
378
379 if (connect(s, aires->ai_addr, aires->ai_addrlen) < 0) {
380 syslog(LOG_INFO, "connect to %s port %s (%s) failed: %s",
381 hostname,
382 servname,
383 aires->ai_protocol == IPPROTO_TCP ? "tcp" : aires->ai_protocol == IPPROTO_UDP ? "udp" : "unknown",
384 strerror(errno));
385 close(s);
386 return;
387 }
388
389 syslog(LOG_INFO, "Connection to %s %s port [%s] succeeded!",
390 hostname,
391 servname,
392 aires->ai_protocol == IPPROTO_TCP ? "tcp" : aires->ai_protocol == IPPROTO_UDP ? "udp" : "unknown");
393 did_connect = true;
394
395 if (do_udp) {
396 // netcat sends a few bytes to set up the connection
397 doreadwrite(-1, s, "XXXX", 4);
398 }
399
400 setup_fd_relay(s, no_stdin ? -1 : STDIN_FILENO, STDOUT_FILENO, ^{
401 close(s);
402 exit(0);
403 });
404 });
405
406 if (did_connect) {
407 break;
408 }
409 }
410 }
411
412 dispatch_group_wait(listen_group, DISPATCH_TIME_FOREVER);
413 freeaddrinfo(aires0);
414
415 if (!do_listen && aires == NULL) {
416 // got to the end of the address list without connecting
417 exit(1);
418 }
419
420 dispatch_main();
421
422 return 0;
423 }
424
425 void usage(void)
426 {
427 fprintf(stderr, "Usage: %s [-4] [-6] [-D] [-d] [-h] [-k] [-l] [-n] [-v]\n", getprogname());
428 fprintf(stderr, " \t[-u] [-p <source_port>] [-s <source_ip>]\n");
429 exit(EX_USAGE);
430 }
431
432 void *run_block(void *arg)
433 {
434 void (^b)(void) = (void (^)(void))arg;
435
436 b();
437
438 _Block_release(arg);
439
440 return NULL;
441 }
442
443 /*
444 * Read up-to as much as is requested, and write
445 * that to the other fd, taking into account exceptional
446 * conditions and re-trying
447 */
448 void doreadwrite(int fd1, int fd2, char *buffer, size_t len) {
449 ssize_t readBytes, writeBytes, totalWriteBytes;
450
451 if (fd1 != -1) {
452 syslog(LOG_DEBUG, "trying to read %ld bytes from fd %d", len, fd1);
453 readBytes = read(fd1, buffer, len);
454 if (readBytes < 0) {
455 if (errno == EINTR || errno == EAGAIN) {
456 /* can't do anything now, hope we get called again */
457 syslog(LOG_DEBUG, "error read fd %d: %s (%d)", fd1, strerror(errno), errno);
458 return;
459 } else {
460 err(1, "read fd %d", fd1);
461 }
462 } else if (readBytes == 0) {
463 syslog(LOG_DEBUG, "EOF on fd %d", fd1);
464 return;
465 }
466 syslog(LOG_DEBUG, "read %ld bytes from fd %d", readBytes, fd1);
467 } else {
468 readBytes = len;
469 syslog(LOG_DEBUG, "read buffer has %ld bytes", readBytes);
470 }
471
472 totalWriteBytes = 0;
473 do {
474 writeBytes = write(fd2, buffer+totalWriteBytes, readBytes-totalWriteBytes);
475 if (writeBytes < 0) {
476 if (errno == EINTR || errno == EAGAIN) {
477 continue;
478 } else {
479 err(1, "write fd %d", fd2);
480 }
481 }
482 syslog(LOG_DEBUG, "wrote %ld bytes to fd %d", writeBytes, fd2);
483 totalWriteBytes += writeBytes;
484
485 } while (totalWriteBytes < readBytes);
486
487 return;
488 }
489
490 /*
491 * We set up dispatch sources for netfd and infd.
492 * Since only one callback is called at a time per-source,
493 * we don't need any additional serialization, and the network
494 * and infd could be read from at the same time.
495 */
496 void setup_fd_relay(int netfd /* bidirectional */,
497 int infd /* local input */,
498 int outfd /* local output */,
499 void (^finalizer_block)(void))
500 {
501 dispatch_source_t netsource = NULL, insource = NULL;
502
503 dispatch_queue_t teardown_queue = dispatch_queue_create("teardown_queue", NULL);
504
505 void (^finalizer_block_copy)(void) = _Block_copy(finalizer_block); // release after calling
506 void (^cancel_hander)(dispatch_source_t source) = ^(dispatch_source_t source){
507 dlog(source);
508 dlog(teardown_queue);
509
510 /*
511 * allowing the teardown queue to become runnable will get
512 * the teardown block scheduled, which will cancel all other
513 * sources and call the client-supplied finalizer
514 */
515 dispatch_resume(teardown_queue);
516 dispatch_release(teardown_queue);
517 };
518 void (^event_handler)(dispatch_source_t source, int wfd) = ^(dispatch_source_t source, int wfd) {
519 int rfd = dispatch_source_get_handle(source);
520 size_t bytesAvail = dispatch_source_get_data(source);
521 char *buffer;
522
523 syslog(LOG_DEBUG, "dispatch source %d -> %d has %lu bytes available",
524 rfd, wfd, bytesAvail);
525 if (bytesAvail == 0) {
526 dlog(source);
527 dispatch_source_cancel(source);
528 return;
529 }
530 buffer = malloc(BUFFER_SIZE);
531 doreadwrite(rfd,wfd, buffer, MIN(BUFFER_SIZE, bytesAvail+2));
532 free(buffer);
533 };
534
535 /*
536 * Suspend this now twice so that neither source can accidentally resume it
537 * while we're still setting up the teardown block. When either source
538 * gets an EOF, the queue is resumed so that it can teardown the other source
539 * and call the client-supplied finalizer
540 */
541 dispatch_suspend(teardown_queue);
542 dispatch_suspend(teardown_queue);
543
544 if (infd != -1) {
545 dispatch_retain(teardown_queue); // retain so that we can resume in this block later
546
547 dlog(teardown_queue);
548
549 // since the event handler serializes, put this on a concurrent queue
550 insource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, infd, 0, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0));
551 dispatch_source_set_event_handler(insource, ^{ event_handler(insource, netfd); });
552 dispatch_source_set_cancel_handler(insource, ^{ cancel_hander(insource); });
553 dispatch_resume(insource);
554 dlog(insource);
555 }
556
557 dispatch_retain(teardown_queue); // retain so that we can resume in this block later
558
559 dlog(teardown_queue);
560
561 // since the event handler serializes, put this on a concurrent queue
562 netsource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, netfd, 0, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0));
563 dispatch_source_set_event_handler(netsource, ^{ event_handler(netsource, outfd); });
564 dispatch_source_set_cancel_handler(netsource, ^{ cancel_hander(netsource); });
565 dispatch_resume(netsource);
566 dlog(netsource);
567
568 dispatch_async(teardown_queue, ^{
569 syslog(LOG_DEBUG, "Closing connection on fd %d -> %d -> %d", infd, netfd, outfd);
570
571 if (insource) {
572 dlog(insource);
573 dispatch_source_cancel(insource);
574 dispatch_release(insource); // matches initial create
575 dlog(insource);
576 }
577
578 dlog(netsource);
579 dispatch_source_cancel(netsource);
580 dispatch_release(netsource); // matches initial create
581 dlog(netsource);
582
583 dlog(teardown_queue);
584
585 finalizer_block_copy();
586 _Block_release(finalizer_block_copy);
587 });
588
589 /* Resume this once so their either source can do the second resume
590 * to start the teardown block running
591 */
592 dispatch_resume(teardown_queue);
593 dispatch_release(teardown_queue); // matches initial create
594 dlog(teardown_queue);
595 }
596